package com.flink.test;

import com.flink.examples.vo.Order;
import com.google.gson.Gson;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;

import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

/**
 * @Description 向kafka发送测试模拟订单数据
 * @Author JL
 * @Date 2020/10/10
 * @Version V1.0
 */
public class OrderToKafkaMsg {
    static String upperNames = "赵,钱,孙,李,周,吴,郑,王,冯,陈,褚,卫,蒋,沈,韩,杨,朱,秦,尤,许,何,吕,施,张,孔,曹,严,华,金," +
            "魏,陶,姜,戚,谢,邹,喻,柏,水,窦,章,云,苏,潘,葛,奚,范,彭,郎,鲁,韦,昌,马,苗,凤,花,方,俞,任,袁,柳,酆,鲍,史,唐," +
            "费,廉,岑,薛,雷,贺,倪,汤,滕,殷,罗,毕,郝,邬,安,常,乐,于,时,傅,皮,卞,齐,康,伍,余,元,卜,顾,孟,平,黄,和,穆,萧";
    static String upperNums = "壹,贰,叁,肆,伍,陆,柒,捌,玖,拾,佰,仟,万,亿,元,角,分,零";
    static String [] goodsTypes = new String[]{"数码", "美食", "时尚", "家居", "运动", "母婴", "大杂烩", "包包"};

    /**
     * 订单信息：
     * 订单ID、用户名称、下单时间、总价
     * @param args
     */
    public static void main(String[] args) throws Exception {
        //生产者发送消息
        KafkaUtils.KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("192.168.110.35", 9092);
        String topic = "order_behavior";
        String [] userNames = StringUtils.split(upperNames, ",");
        String [] nums = StringUtils.split(upperNums, ",");
        //模拟不停电创建模拟订单
        int i = 0;
        while(true){
            String orderId = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMddHHmmssSSS") + RandomUtils.nextInt(1000 , 9999);
            String userName  = userNames[RandomUtils.nextInt(0, userNames.length)] + nums[RandomUtils.nextInt(0, nums.length)];
            String goodsType = goodsTypes[RandomUtils.nextInt(0, goodsTypes.length)];
            Double price = RandomUtils.nextDouble(20.00, 1000.00);
            Integer num = RandomUtils.nextInt(1, 10);
            BigDecimal priceBig = new BigDecimal(price);
            price = priceBig.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
            //创建总价
            BigDecimal totalPriceBig = new BigDecimal(price * num);
            Double totalPrice = totalPriceBig.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
            //订单生成时间
            Long orderTimeSeries = System.currentTimeMillis();
            String orderTime = DateFormatUtils.format(orderTimeSeries, "yyyy-MM-dd HH:mm:ss");

            //订单ID、用户名称、用户性别、商品名称、商品类型、生产商、下单时间、单价、数量、总价、订单状态、 收货地址、联系方式
            Order order = new Order(orderId, userName, null, null, goodsType, null, orderTime, orderTimeSeries , price, num, totalPrice, null, null, null);
            String orderJson = new Gson().toJson(order);
            //向kafka队列发送数据
            kafkaStreamServer.sendMsg(topic, orderJson);
            i++;
            //模拟不同时间段的消费量
            int startInt = 100;
            //线程休眠
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(startInt, 3000));
        }
    }
}
